跳到主要内容

Golang 中使用 ES

使用 Golang 操作 ES,需要使用官方提供的 go-elasticsearch

基本的 CRUD 操作

package main

import (
"context"
"encoding/json"
"log"
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)

func main() {
// 设置Elasticsearch连接参数
cfg := elasticsearch.Config{
Addresses: []string{
"http://es17-dev.maizuo.com:9200",
},
}

// 创建Elasticsearch客户端
client, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("无法创建Elasticsearch客户端:%v", err)
}

// 创建一个新的索引
indexName := "sample_index"
createIndex(client, indexName)

// 插入文档
docID := "1"
document := map[string]interface{}{
"title": "示例文档",
"content": "这是一个示例文档的内容。",
}
insertDocument(client, indexName, docID, document)

// 获取文档
retrievedDoc, err := getDocument(client, indexName, docID)
if err != nil {
log.Printf("无法获取文档:%v", err)
} else {
log.Printf("检索到的文档:%v", retrievedDoc)
}

// 删除索引
deleteIndex(client, indexName)
}

func createIndex(client *elasticsearch.Client, indexName string) {
ctx := context.Background()
createIndexRequest := esapi.IndicesCreateRequest{
Index: indexName,
}

res, err := createIndexRequest.Do(ctx, client)
if err != nil {
log.Fatalf("无法创建索引:%v", err)
}
defer res.Body.Close()

if res.IsError() {
log.Fatalf("创建索引失败:%s", res.String())
}

log.Printf("索引 %s 创建成功", indexName)
}

func insertDocument(client *elasticsearch.Client, indexName, docID string, document map[string]interface{}) {
ctx := context.Background()
jsonData, err := json.Marshal(document)
if err != nil {
log.Fatalf("无法序列化文档:%v", err)
}

insertRequest := esapi.IndexRequest{
Index: indexName,
DocumentID: docID,
Body: strings.NewReader(string(jsonData)),
Refresh: "true",
}

res, err := insertRequest.Do(ctx, client)
if err != nil {
log.Fatalf("无法插入文档:%v", err)
}
defer res.Body.Close()

if res.IsError() {
log.Fatalf("插入文档失败:%s", res.String())
}

log.Printf("文档插入成功")
}

func getDocument(client *elasticsearch.Client, indexName, docID string) (map[string]interface{}, error) {
ctx := context.Background()
getRequest := esapi.GetRequest{
Index: indexName,
DocumentID: docID,
}

res, err := getRequest.Do(ctx, client)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.IsError() {
return nil, err
}

var doc map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&doc); err != nil {
return nil, err
}

return doc, nil
}

func deleteIndex(client *elasticsearch.Client, indexName string) {
ctx := context.Background()
deleteIndexRequest := esapi.IndicesDeleteRequest{
Index: []string{indexName},
}

res, err := deleteIndexRequest.Do(ctx, client)
if err != nil {
log.Fatalf("无法删除索引:%v", err)
}
defer res.Body.Close()

if res.IsError() {
log.Fatalf("删除索引失败:%s", res.String())
}

log.Printf("索引 %s 删除成功", indexName)
}

使用 Bulk 进行批量操作

Bulk 是 Elasticsearch 提供的一种批量操作 API,它允许您在单个请求中执行多个操作,例如创建、更新和删除文档。Bulk API 可以显著提高索引性能,因为它可以减少网络开销和请求延迟。

使用 Bulk API,可以将多个操作打包到一个请求中,然后将其发送到 Elasticsearch。每个操作都由一个 JSON 对象表示,该对象包含操作类型、文档 ID 和文档数据。Bulk API 支持以下操作类型:

  • index:创建新文档或替换现有文档。
  • update:更新现有文档的部分内容。
  • delete:删除现有文档。

批量创建操作

以下是一个使用 Bulk API 批量创建文档的示例:

import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)

func bulkCreateDocuments(es *elasticsearch.Client, indexName string, documents []map[string]interface{}) error {
// 构建 Bulk API 请求
var reqs []esutil.BulkIndexerItem
for _, doc := range documents {
req := esutil.BulkIndexerItem{
Action: "index",
Index: indexName,
Body: esutil.NewJSONReader(doc),
}
reqs = append(reqs, req)
}

// 执行 Bulk API 请求
res, err := esutil.BulkIndex(context.Background(), es, esutil.BulkIndex.WithIndex(indexName), esutil.BulkIndex.WithBody(esutil.NewJSONReader(reqs...)))
if err != nil {
return err
}
defer res.Body.Close()

// 检查响应状态码
if res.IsError() {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
return fmt.Errorf("Bulk API 返回错误状态码: %s", res.Status())
} else {
return fmt.Errorf("Bulk API 返回错误状态码: %s, 错误信息: %s", res.Status(), errorResponse["error"].(map[string]interface{})["reason"])
}
}

// 解析响应
var response map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return err
}

log.Printf("Bulk API 返回响应: %v", response)

return nil
}

这个 documents 参数是一个包含要创建的文档的 map 切片。

批量更新操作

documents 参数是一个包含要更新的文档 ID 和要更新的字段和值的 map。可以将其传递给 Bulk API 请求的 Body 字段。注意,由于这里使用了 doc 字段来更新文档,因此这里需要将要更新的字段和值包装在一个名为 doc 的 map 中。

import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)

func bulkUpdateDocuments(es *elasticsearch.Client, indexName string, documents map[string]map[string]interface{}) error {
// 构建 Bulk API 请求
var reqs []esutil.BulkIndexerItem
for id, doc := range documents {
req := esutil.BulkIndexerItem{
Action: "update",
Index: indexName,
Id: id,
Body: esutil.NewJSONReader(map[string]interface{}{"doc": doc}),
}
reqs = append(reqs, req)
}

// 执行 Bulk API 请求
res, err := esutil.BulkIndex(context.Background(), es, esutil.BulkIndex.WithIndex(indexName), esutil.BulkIndex.WithBody(esutil.NewJSONReader(reqs...)))
if err != nil {
return err
}
defer res.Body.Close()

// 检查响应状态码
if res.IsError() {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
return fmt.Errorf("Bulk API 返回错误状态码: %s", res.Status())
} else {
return fmt.Errorf("Bulk API 返回错误状态码: %s, 错误信息: %s", res.Status(), errorResponse["error"].(map[string]interface{})["reason"])
}
}

// 解析响应
var response map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return err
}

log.Printf("Bulk API 返回响应: %v", response)

return nil
}